---
title: prefect-dask
description: Accelerate your workflows by running tasks in parallel with Dask
---

Dask can run your tasks in parallel and distribute them over multiple machines.
The `prefect-dask` integration makes it easy to accelerate your flow runs with Dask.

## Getting started

### Install `prefect-dask`

The following command will install a version of `prefect-dask` compatible with your installed version of `prefect`.
If you don't already have `prefect` installed, it will install the newest version of `prefect` as well.

<CodeGroup>

```bash pip
pip install "prefect[dask]"
```

```bash uv
uv pip install "prefect[dask]"
```

</CodeGroup>

Upgrade to the latest versions of `prefect` and `prefect-dask`:

<CodeGroup>

```bash pip
pip install -U "prefect[dask]"
```

```bash uv
uv pip install -U "prefect[dask]"
```

</CodeGroup>

## Why use Dask?

Say your flow downloads many images to train a machine learning model.
It takes longer than you'd like for the flow to run because it executes sequentially.

To accelerate your flow code, parallelize it with `prefect-dask` in three steps:

1. Add the import: `from prefect_dask import DaskTaskRunner`
2. Specify the task runner in the flow decorator: `@flow(task_runner=DaskTaskRunner)`
3. Submit tasks to the flow's task runner: `a_task.submit(*args, **kwargs)`

Below is code with and without the DaskTaskRunner:
<Tabs>
<Tab title="Sequential by default">

```python
# Completed in 15.2 seconds

from typing import List
from pathlib import Path

import httpx
from prefect import flow, task

URL_FORMAT = (
    "https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
    "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
)

@task
def download_image(year: int, month: int, directory: Path) -> Path:
    # download image from URL
    url = URL_FORMAT.format(year=year, month=month)
    resp = httpx.get(url)

    # save content to directory/YYYYMM.png
    file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
    file_path.write_bytes(resp.content)
    return file_path

@flow
def download_nino_34_plumes_from_year(year: int) -> List[Path]:
    # create a directory to hold images
    directory = Path("data")
    directory.mkdir(exist_ok=True)

    # download all images
    file_paths = []
    for month in range(1, 12 + 1):
        file_path = download_image(year, month, directory)
        file_paths.append(file_path)
    return file_paths

if __name__ == "__main__":
    download_nino_34_plumes_from_year(2022)
```
</Tab>


<Tab title="Parallel with Dask">

```python
# Completed in 5.7 seconds

from typing import List
from pathlib import Path

import httpx
from prefect import flow, task
from prefect_dask import DaskTaskRunner

URL_FORMAT = (
    "https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
    "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
)

@task
def download_image(year: int, month: int, directory: Path) -> Path:
    # download image from URL
    url = URL_FORMAT.format(year=year, month=month)
    resp = httpx.get(url)

    # save content to directory/YYYYMM.png
    file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
    file_path.write_bytes(resp.content)
    return file_path

@flow(task_runner=DaskTaskRunner(cluster_kwargs={"processes": False}))
def download_nino_34_plumes_from_year(year: int) -> List[Path]:
    # create a directory to hold images
    directory = Path("data")
    directory.mkdir(exist_ok=True)

    # download all images
    file_paths = []
    for month in range(1, 12 + 1):
        file_path = download_image.submit(year, month, directory)
        file_paths.append(file_path)
    return file_paths

if __name__ == "__main__":
    download_nino_34_plumes_from_year(2022)
```
</Tab>
</Tabs>

In our tests, the flow run took 15.2 seconds to execute sequentially.
Using the `DaskTaskRunner` reduced the runtime to **5.7** seconds!

## Run tasks on Dask

The `DaskTaskRunner` is a [task runner](/v3/develop/task-runners) that submits tasks to the [`dask.distributed`](http://distributed.dask.org/) scheduler.
By default, when the `DaskTaskRunner` is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.

If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the `address` kwarg.

`DaskTaskRunner` accepts the following optional parameters:

| Parameter | Description |
| --- | --- |
| address | Address of a currently running Dask scheduler. |
| cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, `"distributed.LocalCluster"`), or the class itself. |
| cluster_kwargs | Additional kwargs to pass to the `cluster_class` when creating a temporary Dask cluster. |
| adapt_kwargs | Additional kwargs to pass to `cluster.adapt` when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if `adapt_kwargs` are provided. |
| client_kwargs | Additional kwargs to use when creating a [`dask.distributed.Client`](https://distributed.dask.org/en/latest/api.html#client). |

<Warning>
**Multiprocessing safety**

Because the `DaskTaskRunner` uses multiprocessing, calls to flows in scripts must be guarded with `if __name__ == "__main__":` or you will encounter warnings and errors.
</Warning>

If you don't provide the `address` of a Dask scheduler, Prefect creates a temporary local cluster automatically.
The number of workers used is based on the number of cores on your machine.
The default provides a mix of processes and threads that work well for most workloads.
To specify this explicitly, pass values for `n_workers` or `threads_per_worker` to `cluster_kwargs`:

```python
from prefect_dask import DaskTaskRunner

# Use 4 worker processes, each with 2 threads
DaskTaskRunner(
    cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
)
```

### Use a temporary cluster

The `DaskTaskRunner` can create a temporary cluster using any of [Dask's cluster-manager options](https://docs.dask.org/en/latest/setup.html).
This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure it, provide a `cluster_class`.
This can be:

- A string specifying the import path to the cluster class (for example, `"dask_cloudprovider.aws.FargateCluster"`)
- The cluster class itself
- A function for creating a custom cluster

You can also configure `cluster_kwargs`.
This takes a dictionary of keyword arguments to pass to `cluster_class` when starting the flow run.

For example, to configure a flow to use a temporary `dask_cloudprovider.aws.FargateCluster` with four workers running with an image named `my-prefect-image`:

```python
from prefect_dask import DaskTaskRunner

DaskTaskRunner(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)
```

For larger workloads, you can accelerate execution further by distributing task runs over multiple machines.

### Connect to an existing cluster

Multiple Prefect flow runs can use the same existing Dask cluster.
You might manage a single long-running Dask cluster (for example, using the Dask [Helm Chart](https://docs.dask.org/en/latest/setup/kubernetes-helm.html)) and configure flows to connect to it during execution.
This has disadvantages compared to using a temporary Dask cluster:

- All workers in the cluster must have dependencies installed for all flows you intend to run.
- Multiple flow runs may compete for resources. Dask tries to do a good job
sharing resources between tasks, but you may still run into issues.

Still, you may prefer managing a single long-running Dask cluster.

To configure a `DaskTaskRunner` to connect to an existing cluster, pass in the address of the scheduler to the `address` argument:

```python
from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(address="http://my-dask-cluster"))
def my_flow():
    ...
```

Suppose you have an existing Dask client/cluster such as a `dask.dataframe.DataFrame`.
With `prefect-dask`, it takes just a few steps:

1. Add imports
2. Add `task` and `flow` decorators
3. Use `get_dask_client` context manager to distribute work across Dask workers
4. Specify the task runner and client's address in the flow decorator
5. Submit the tasks to the flow's task runner

<Tabs>
<Tab title="Without Prefect">

```python
import dask.dataframe
import dask.distributed


client = dask.distributed.Client()

def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df


def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:

    df_yearly_avg = df.groupby(df.index.year).mean()
    return df_yearly_avg.compute()


def dask_pipeline():
    df = read_data("1988", "2022")
    df_yearly_average = process_data(df)
    return df_yearly_average


if __name__ == "__main__":
    dask_pipeline()
```
</Tab>

<Tab title="With Prefect">

```python
import dask.dataframe
import dask.distributed
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client


client = dask.distributed.Client()

@task
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df

@task
def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:
    with get_dask_client():
        df_yearly_avg = df.groupby(df.index.year).mean()
        return df_yearly_avg.compute()

@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def dask_pipeline():
    df = read_data.submit("1988", "2022")
    df_yearly_average = process_data.submit(df)
    return df_yearly_average


if __name__ == "__main__":
    dask_pipeline()
```

</Tab>
</Tabs>

### Configure adaptive scaling

A key feature of using a `DaskTaskRunner` is the ability to scale adaptively to the workload.
Instead of specifying `n_workers` as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.

To do this, pass `adapt_kwargs` to `DaskTaskRunner`.
This takes the following fields:

- `maximum` (`int` or `None`, optional): the maximum number of workers to scale to. Set to `None` for no maximum.
- `minimum` (`int` or `None`, optional): the minimum number of workers to scale to. Set to `None` for no minimum.

For example, this configures a flow to run on a `FargateCluster` scaling up to a maximum of 10 workers:

```python
from prefect_dask import DaskTaskRunner

DaskTaskRunner(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    adapt_kwargs={"maximum": 10}
)
```

### Use Dask annotations

Use Dask annotations to further control the behavior of tasks.
For example, set the [priority](http://distributed.dask.org/en/stable/priority.html) of tasks in the Dask scheduler:

```python
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def show(x):
    print(x)


@flow(task_runner=DaskTaskRunner())
def my_flow():
    with dask.annotate(priority=-10):
        future = show.submit(1)  # low priority task

    with dask.annotate(priority=10):
        future = show.submit(2)  # high priority task
```

Another common use case is [resource](http://distributed.dask.org/en/stable/resources.html) annotations:

```python
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def show(x):
    print(x)

# Create a `LocalCluster` with some resource annotations
# Annotations are abstract in dask and not inferred from your system.
# Here, we claim that our system has 1 GPU and 1 process available per worker
@flow(
    task_runner=DaskTaskRunner(
        cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
    )
)

def my_flow():
    with dask.annotate(resources={'GPU': 1}):
        future = show(0)  # this task requires 1 GPU resource on a worker

    with dask.annotate(resources={'process': 1}):
        # These tasks each require 1 process on a worker; because we've
        # specified that our cluster has 1 process per worker and 1 worker,
        # these tasks will run sequentially
        future = show(1)
        future = show(2)
        future = show(3)


if __name__ == "__main__":
    my_flow()
```

## Additional Resources

Refer to the `prefect-dask` [SDK documentation](https://reference.prefect.io/prefect_dask/) to explore all the capabilities of the `prefect-dask` library.

For assistance using Dask, consult the [Dask documentation](https://docs.dask.org/en/stable/)

<Warning>
**Resolving futures in sync client**

Note, by default, `dask_collection.compute()` returns concrete values while `client.compute(dask_collection)` returns Dask Futures. Therefore, if you call `client.compute`, you must resolve all futures before exiting out of the context manager by either:

1. setting `sync=True`
```python
with get_dask_client() as client:
    df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
    summary_df = client.compute(df.describe(), sync=True)
```

2. calling `result()`
```python
with get_dask_client() as client:
    df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
    summary_df = client.compute(df.describe()).result()
```
For more information, visit the docs on [Waiting on Futures](https://docs.dask.org/en/stable/futures.html#waiting-on-futures).
</Warning>

There is also an equivalent context manager for asynchronous tasks and flows: `get_async_dask_client`. When using the async client, you must `await client.compute(dask_collection)` before exiting the context manager.

Note that task submission (`.submit()`) and future resolution (`.result()`) are always synchronous operations in Prefect, even when working with async tasks and flows.